Skip to content

[flink] Decouple Committer.Context from operator-only assumptions#8221

Merged
JingsongLi merged 5 commits into
apache:masterfrom
ifndef-SleePy:coordinator-commit-pr1-state-store-refactor
Jun 23, 2026
Merged

[flink] Decouple Committer.Context from operator-only assumptions#8221
JingsongLi merged 5 commits into
apache:masterfrom
ifndef-SleePy:coordinator-commit-pr1-state-store-refactor

Conversation

@ifndef-SleePy

Copy link
Copy Markdown
Contributor

Purpose

Tracked under: #8220

Remove the assumption that Committer always runs inside a Flink operator, so the same committer logic can be hosted by either an operator or a OperatorCoordinator (introduced in a follow-up PR under #8220 ).

This PR is a pure refactor — no behavior change.

Changes

  • Introduce a StateStore abstraction for committer state access, with operator-backed and memory-backed implementations.
  • Route Committer.Context state access through StateStore; decouple CommittableStateManager from Flink's state initialization context.
  • Move the listener-state snapshot into StoreCommitter.snapshotState, so the committer owns its state lifecycle end-to-end.
  • Relax the metric group exposed via Committer.Context so it is not tied to OperatorMetricGroup, and adapt CommitterMetrics accordingly.

Tests

Existing committer / sink unit tests pass unchanged.

liubiao.leo added 5 commits May 29, 2026 20:41
Adds the StateStore interface and an OperatorStateStore-backed implementation; no existing code path is touched yet.
Replaces direct OperatorStateStore usage in Committer.Context and the commit listeners with the new StateStore, and adds a no-op default Committer.snapshotState().
Interface now takes Committer.Context and committables instead of Flink state contexts; implementations acquire state via Committer.Context.stateStore().
CommitterOperator now invokes committer.snapshotState() at the snapshot boundary, and the listener-state persistence is relocated from groupByCheckpoint to the dedicated snapshotState override (forwarded by StoreMultiCommitter).
…text

Widens Committer.Context.metricGroup() to MetricGroup; CommitterMetrics adapts to either an OperatorMetricGroup (using its IO counters) or a plain MetricGroup (using local SimpleCounters).
@ifndef-SleePy ifndef-SleePy force-pushed the coordinator-commit-pr1-state-store-refactor branch from ba19a05 to 546a2dd Compare June 12, 2026 13:08
@ifndef-SleePy

Copy link
Copy Markdown
Contributor Author

Please take a look @JingsongLi .

BTW I couldn't reproduce the MySqlSyncTableActionITCase.testAllTypes failure locally — could you help re-run that job?

@JingsongLi

Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 8fb1bc5 into apache:master Jun 23, 2026
11 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants